今天我們要讓 AI 助理變得更快更高效!透過並行處理和效能優化技術,系統能夠同時執行多個任務,大幅縮短處理時間,提升使用者體驗。
在實際應用中,許多任務可以同時執行:
並行處理能將原本需要順序執行的任務改為同時進行,大幅提升效率。
parallel_processing/
├── main.py # 主程式
├── core/
│ ├── __init__.py
│ ├── parallel_executor.py # 並行執行器
│ └── task_pool.py # 任務池
├── optimizers/
│ ├── __init__.py
│ ├── cache_manager.py # 快取管理器
│ └── batch_processor.py # 批次處理器
├── workflows/
│ ├── __init__.py
│ └── parallel_workflow.py # 並行工作流程
└── utils/
├── __init__.py
└── performance_monitor.py # 效能監控器
import concurrent.futures
from typing import List, Dict, Any, Callable
import time
from dataclasses import dataclass
import threading
@dataclass
class ParallelTask:
"""並行任務"""
id: str
name: str
function: Callable
args: tuple = ()
kwargs: dict = None
timeout: float = 30.0
def __post_init__(self):
if self.kwargs is None:
self.kwargs = {}
class ParallelExecutor:
"""並行執行器"""
def __init__(self, max_workers: int = 5):
self.max_workers = max_workers
self.execution_stats = {
'total_tasks': 0,
'successful_tasks': 0,
'failed_tasks': 0,
'total_time': 0.0
}
def execute_parallel(self, tasks: List[ParallelTask]) -> Dict[str, Any]:
"""並行執行多個任務"""
if not tasks:
return {'success': True, 'results': {}}
start_time = time.time()
results = {}
errors = {}
print(f"🚀 開始並行執行 {len(tasks)} 個任務...")
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# 提交所有任務
future_to_task = {
executor.submit(self._execute_single_task, task): task
for task in tasks
}
# 收集結果
for future in concurrent.futures.as_completed(future_to_task):
task = future_to_task[future]
try:
result = future.result(timeout=task.timeout)
results[task.id] = result
self.execution_stats['successful_tasks'] += 1
print(f"✅ 任務完成: {task.name}")
except concurrent.futures.TimeoutError:
error_msg = f"任務逾時 (>{task.timeout}秒)"
errors[task.id] = error_msg
self.execution_stats['failed_tasks'] += 1
print(f"⏰ 任務逾時: {task.name}")
except Exception as e:
errors[task.id] = str(e)
self.execution_stats['failed_tasks'] += 1
print(f"❌ 任務失敗: {task.name} - {e}")
end_time = time.time()
execution_time = end_time - start_time
self.execution_stats['total_tasks'] += len(tasks)
self.execution_stats['total_time'] += execution_time
print(f"⏱️ 並行執行完成,耗時: {execution_time:.2f} 秒")
return {
'success': len(errors) == 0,
'results': results,
'errors': errors,
'execution_time': execution_time,
'tasks_count': len(tasks),
'success_rate': len(results) / len(tasks) if tasks else 0
}
def _execute_single_task(self, task: ParallelTask) -> Any:
"""執行單個任務"""
try:
return task.function(*task.args, **task.kwargs)
except Exception as e:
raise Exception(f"任務執行失敗: {str(e)}")
def get_stats(self) -> Dict[str, Any]:
"""獲取執行統計"""
total = self.execution_stats['total_tasks']
return {
**self.execution_stats,
'success_rate': (
self.execution_stats['successful_tasks'] / total
if total > 0 else 0
),
'average_time': (
self.execution_stats['total_time'] / total
if total > 0 else 0
)
}
from typing import Any, Optional, Callable
import time
import hashlib
import json
from functools import wraps
class CacheManager:
"""快取管理器"""
def __init__(self, default_ttl: int = 300):
self.cache = {}
self.default_ttl = default_ttl # 預設存活時間(秒)
self.hit_count = 0
self.miss_count = 0
def get(self, key: str) -> Optional[Any]:
"""從快取獲取值"""
if key in self.cache:
entry = self.cache[key]
# 檢查是否過期
if time.time() < entry['expires_at']:
self.hit_count += 1
print(f"💾 快取命中: {key}")
return entry['value']
else:
# 過期,刪除
del self.cache[key]
self.miss_count += 1
return None
def set(self, key: str, value: Any, ttl: int = None) -> None:
"""設定快取值"""
ttl = ttl or self.default_ttl
self.cache[key] = {
'value': value,
'created_at': time.time(),
'expires_at': time.time() + ttl
}
print(f"💾 快取已設定: {key} (TTL: {ttl}秒)")
def invalidate(self, key: str) -> None:
"""使快取失效"""
if key in self.cache:
del self.cache[key]
print(f"🗑️ 快取已清除: {key}")
def clear_all(self) -> None:
"""清除所有快取"""
self.cache.clear()
print("🗑️ 所有快取已清除")
def get_stats(self) -> dict:
"""獲取快取統計"""
total_requests = self.hit_count + self.miss_count
hit_rate = self.hit_count / total_requests if total_requests > 0 else 0
return {
'cache_size': len(self.cache),
'hit_count': self.hit_count,
'miss_count': self.miss_count,
'hit_rate': hit_rate
}
def cached(self, ttl: int = None):
"""快取裝飾器"""
def decorator(func: Callable):
@wraps(func)
def wrapper(*args, **kwargs):
# 生成快取鍵
cache_key = self._generate_cache_key(func.__name__, args, kwargs)
# 嘗試從快取獲取
cached_value = self.get(cache_key)
if cached_value is not None:
return cached_value
# 執行函數
result = func(*args, **kwargs)
# 儲存到快取
self.set(cache_key, result, ttl)
return result
return wrapper
return decorator
def _generate_cache_key(self, func_name: str, args: tuple, kwargs: dict) -> str:
"""生成快取鍵"""
# 將參數序列化為字串
key_data = {
'func': func_name,
'args': str(args),
'kwargs': str(sorted(kwargs.items()))
}
key_str = json.dumps(key_data, sort_keys=True)
# 生成 hash
return hashlib.md5(key_str.encode()).hexdigest()
from typing import List, Any, Callable, Dict
import google.generativeai as genai
import os
import time
genai.configure(api_key=os.getenv('GEMINI_API_KEY'))
class BatchProcessor:
"""批次處理器"""
def __init__(self, batch_size: int = 5):
self.batch_size = batch_size
self.model = genai.GenerativeModel('gemini-2.0-flash-exp')
def process_batch(self, items: List[Any],
processor_func: Callable,
show_progress: bool = True) -> List[Dict[str, Any]]:
"""批次處理項目"""
results = []
total = len(items)
print(f"📦 開始批次處理 {total} 個項目 (批次大小: {self.batch_size})")
for i in range(0, total, self.batch_size):
batch = items[i:i + self.batch_size]
batch_num = i // self.batch_size + 1
total_batches = (total + self.batch_size - 1) // self.batch_size
if show_progress:
print(f"🔄 處理批次 {batch_num}/{total_batches}...")
# 處理批次
batch_start = time.time()
for item in batch:
try:
result = processor_func(item)
results.append({
'item': item,
'result': result,
'success': True
})
except Exception as e:
results.append({
'item': item,
'error': str(e),
'success': False
})
batch_time = time.time() - batch_start
if show_progress:
print(f"✅ 批次 {batch_num} 完成 (耗時: {batch_time:.2f}秒)")
# 批次間短暫延遲,避免 API 限流
if i + self.batch_size < total:
time.sleep(0.5)
success_count = sum(1 for r in results if r['success'])
print(f"📊 批次處理完成: {success_count}/{total} 成功")
return results
def batch_analyze_texts(self, texts: List[str],
analysis_type: str = "sentiment") -> List[Dict]:
"""批次分析文字"""
def analyze_single(text: str) -> Dict:
prompts = {
"sentiment": f"分析以下文字的情感傾向(正面/中性/負面):{text}",
"summary": f"用一句話總結:{text}",
"category": f"將以下文字分類:{text}"
}
prompt = prompts.get(analysis_type, prompts["sentiment"])
try:
response = self.model.generate_content(prompt)
return {'analysis': response.text}
except Exception as e:
return {'error': str(e)}
return self.process_batch(texts, analyze_single)
from langgraph.graph import StateGraph, END
from typing import TypedDict, List, Dict, Any
from core.parallel_executor import ParallelExecutor, ParallelTask
from optimizers.cache_manager import CacheManager
import google.generativeai as genai
import os
import time
genai.configure(api_key=os.getenv('GEMINI_API_KEY'))
class ParallelWorkflowState(TypedDict):
user_queries: List[str]
parallel_results: Dict[str, Any]
aggregated_response: str
execution_time: float
class ParallelWorkflow:
"""並行處理工作流程"""
def __init__(self):
self.executor = ParallelExecutor(max_workers=5)
self.cache = CacheManager(default_ttl=300)
self.model = genai.GenerativeModel('gemini-2.5-flash')
def create_workflow(self):
"""創建並行工作流程"""
workflow = StateGraph(ParallelWorkflowState)
workflow.add_node("process_parallel", self.process_queries_parallel)
workflow.add_node("aggregate_results", self.aggregate_results_node)
workflow.set_entry_point("process_parallel")
workflow.add_edge("process_parallel", "aggregate_results")
workflow.add_edge("aggregate_results", END)
return workflow.compile()
def process_queries_parallel(self, state: ParallelWorkflowState) -> ParallelWorkflowState:
"""並行處理多個查詢"""
queries = state["user_queries"]
print(f"\n🚀 準備並行處理 {len(queries)} 個查詢...")
# 創建並行任務
tasks = []
for i, query in enumerate(queries):
task = ParallelTask(
id=f"query_{i}",
name=f"處理查詢: {query[:30]}...",
function=self.process_single_query_with_cache,
args=(query,),
timeout=15.0
)
tasks.append(task)
# 執行並行處理
start_time = time.time()
execution_result = self.executor.execute_parallel(tasks)
execution_time = time.time() - start_time
return {
**state,
"parallel_results": execution_result['results'],
"execution_time": execution_time
}
def process_single_query_with_cache(self, query: str) -> str:
"""處理單個查詢(帶快取)"""
# 嘗試從快取獲取
cached_result = self.cache.get(f"query_{query}")
if cached_result:
return cached_result
# 實際處理
try:
response = self.model.generate_content(query)
result = response.text
# 儲存到快取
self.cache.set(f"query_{query}", result, ttl=300)
return result
except Exception as e:
return f"處理失敗: {str(e)}"
def aggregate_results_node(self, state: ParallelWorkflowState) -> ParallelWorkflowState:
"""聚合結果節點"""
results = state["parallel_results"]
queries = state["user_queries"]
print("\n📊 聚合處理結果...")
# 組合所有結果
aggregated_text = "🎯 **並行處理結果總覽**\n\n"
for i, query in enumerate(queries):
query_id = f"query_{i}"
result = results.get(query_id, "無結果")
aggregated_text += f"**問題 {i+1}:** {query}\n"
aggregated_text += f"**回答:** {result[:200]}{'...' if len(result) > 200 else ''}\n\n"
aggregated_text += "-" * 50 + "\n\n"
# 添加效能資訊
execution_time = state["execution_time"]
aggregated_text += f"\n⏱️ 總執行時間: {execution_time:.2f} 秒\n"
aggregated_text += f"📈 平均每查詢: {execution_time/len(queries):.2f} 秒\n"
# 快取統計
cache_stats = self.cache.get_stats()
aggregated_text += f"💾 快取命中率: {cache_stats['hit_rate']:.1%}\n"
return {
**state,
"aggregated_response": aggregated_text
}
import time
from functools import wraps
from typing import Callable, Dict, Any
import statistics
class PerformanceMonitor:
"""效能監控器"""
def __init__(self):
self.metrics = {}
def measure_time(self, func_name: str = None):
"""測量執行時間的裝飾器"""
def decorator(func: Callable):
name = func_name or func.__name__
if name not in self.metrics:
self.metrics[name] = []
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
execution_time = end_time - start_time
self.metrics[name].append(execution_time)
print(f"⏱️ {name} 執行時間: {execution_time:.3f} 秒")
return result
return wrapper
return decorator
def get_statistics(self, func_name: str = None) -> Dict[str, Any]:
"""獲取效能統計"""
if func_name:
if func_name not in self.metrics or not self.metrics[func_name]:
return {}
times = self.metrics[func_name]
return self._calculate_stats(func_name, times)
# 返回所有函數的統計
all_stats = {}
for name, times in self.metrics.items():
if times:
all_stats[name] = self._calculate_stats(name, times)
return all_stats
def _calculate_stats(self, name: str, times: list) -> Dict[str, float]:
"""計算統計資訊"""
return {
'function': name,
'call_count': len(times),
'total_time': sum(times),
'avg_time': statistics.mean(times),
'min_time': min(times),
'max_time': max(times),
'median_time': statistics.median(times)
}
def print_report(self):
"""列印效能報告"""
print("\n📊 效能報告")
print("=" * 60)
all_stats = self.get_statistics()
for func_name, stats in all_stats.items():
print(f"\n函數: {func_name}")
print(f" 呼叫次數: {stats['call_count']}")
print(f" 總時間: {stats['total_time']:.3f} 秒")
print(f" 平均時間: {stats['avg_time']:.3f} 秒")
print(f" 最小時間: {stats['min_time']:.3f} 秒")
print(f" 最大時間: {stats['max_time']:.3f} 秒")
print(f" 中位數: {stats['median_time']:.3f} 秒")
from workflows.parallel_workflow import ParallelWorkflow
from utils.performance_monitor import PerformanceMonitor
def main():
"""並行處理與效能優化示例"""
print("⚡ 並行處理與效能優化系統")
print("🚀 支援多任務並行、快取機制、批次處理")
print("=" * 55)
workflow_manager = ParallelWorkflow()
workflow = workflow_manager.create_workflow()
monitor = PerformanceMonitor()
# 測試查詢集
test_query_sets = {
"多領域查詢": [
"Python 的主要特性是什麼?",
"機器學習的基本概念",
"什麼是 REST API?"
],
"資料分析": [
"如何進行數據清洗?",
"常見的統計方法有哪些?",
"資料視覺化的最佳實踐"
],
"程式設計": [
"什麼是設計模式?",
"如何優化程式效能?",
"單元測試的重要性"
]
}
while True:
print("\n選擇操作:")
print("1. 執行預設並行查詢測試")
print("2. 自訂並行查詢")
print("3. 查看快取統計")
print("4. 查看效能報告")
print("5. 清除快取")
print("6. 退出")
choice = input("\n請選擇 (1-6):").strip()
try:
if choice == '1':
print("\n📋 可用的測試集:")
test_sets = list(test_query_sets.keys())
for i, name in enumerate(test_sets, 1):
print(f"{i}. {name}")
set_num = input("選擇測試集 (1-3):").strip()
if set_num.isdigit() and 1 <= int(set_num) <= len(test_sets):
set_name = test_sets[int(set_num) - 1]
queries = test_query_sets[set_name]
print(f"\n🧪 執行測試:{set_name}")
print(f"查詢數量:{len(queries)}")
# 執行工作流程
initial_state = {
"user_queries": queries,
"parallel_results": {},
"aggregated_response": "",
"execution_time": 0.0
}
print("\n" + "="*55)
# 使用效能監控
@monitor.measure_time("parallel_workflow")
def run_workflow():
return workflow.invoke(initial_state)
result = run_workflow()
print("="*55)
print(f"\n{result['aggregated_response']}")
else:
print("❌ 無效的選擇")
elif choice == '2':
print("\n📝 自訂並行查詢")
print("請輸入多個問題(每行一個,輸入空行結束):")
custom_queries = []
while True:
query = input().strip()
if not query:
break
custom_queries.append(query)
if custom_queries:
initial_state = {
"user_queries": custom_queries,
"parallel_results": {},
"aggregated_response": "",
"execution_time": 0.0
}
result = workflow.invoke(initial_state)
print(f"\n{result['aggregated_response']}")
else:
print("❌ 沒有輸入查詢")
elif choice == '3':
cache_stats = workflow_manager.cache.get_stats()
print("\n💾 快取統計資訊:")
print(f"快取大小:{cache_stats['cache_size']}")
print(f"命中次數:{cache_stats['hit_count']}")
print(f"未命中次數:{cache_stats['miss_count']}")
print(f"命中率:{cache_stats['hit_rate']:.1%}")
elif choice == '4':
monitor.print_report()
executor_stats = workflow_manager.executor.get_stats()
print("\n⚡ 並行執行統計:")
print(f"總任務數:{executor_stats['total_tasks']}")
print(f"成功任務:{executor_stats['successful_tasks']}")
print(f"失敗任務:{executor_stats['failed_tasks']}")
print(f"成功率:{executor_stats['success_rate']:.1%}")
print(f"平均執行時間:{executor_stats['average_time']:.3f} 秒")
elif choice == '5':
workflow_manager.cache.clear_all()
print("✅ 快取已清除")
elif choice == '6':
print("👋 再見!")
break
else:
print("❌ 無效的選擇")
except KeyboardInterrupt:
print("\n👋 再見!")
break
except Exception as e:
print(f"❌ 發生錯誤:{e}")
if __name__ == "__main__":
main()
✅ 多執行緒並行:使用 ThreadPoolExecutor 同時處理多個任務
✅ 智能快取機制:減少重複計算,提升回應速度
✅ 批次處理:高效處理大量資料
✅ 效能監控:即時追蹤執行時間和資源使用
✅ 容錯處理:單一任務失敗不影響其他任務
順序執行 vs 並行執行:
順序執行 3 個查詢:
查詢1: 2.5秒
查詢2: 2.3秒
查詢3: 2.4秒
總時間: 7.2秒
並行執行 3 個查詢:
同時執行...
總時間: 2.6秒
效能提升: 約 177% 🚀
今天我們學習了並行處理和效能優化技術,讓 AI 助理能夠更快速地處理多個任務。明天我們將探索人機協作介面設計,提升互動體驗!